package com.brian.rabbitmq.service;

import com.brian.rabbitmq.util.SerializeUtil;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @program: architect  短信消费者
 * @author: Brian Huang
 * @create: 2019-07-14 22
 **/
@Component
public class SmsConsumer {

    @Autowired
    private MessageIdService messageIdService;


    //@RabbitHandler
    //@RabbitListener(queues = "fanout_sms_queue")
    public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        String messageId = message.getMessageProperties().getMessageId();
        String id = messageIdService.getId(messageId);

        if(StringUtils.isEmpty(id)){
            System.out.println("短信消费者已经消费该消息");
            return;
        }
        String messageEntity = SerializeUtil.Byte2String(message.getBody());
        System.out.println("短信消费者收到消息：" + messageEntity);
        //模拟调用短信接口
        double result = Math.random() * 10;
        if(result > 7){
                throw  new Exception("调用第三方短信接口短信接口失败");
        }

        // 手动ack
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 手动签收
        channel.basicAck(deliveryTag, false);

        messageIdService.deleteId(id);

        //TODO 怎么获取到到当前消息retry次数
    }

    @RabbitHandler
    @RabbitListener(queues = "fanout_sms_queue")
    public void deadProcess(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        String messageId = message.getMessageProperties().getMessageId();
        String id = messageIdService.getId(messageId);

        if(StringUtils.isEmpty(id)){
            System.out.println("短信消费者已经消费该消息");
            return;
        }

        String messageEntity = SerializeUtil.Byte2String(message.getBody());
        System.out.println("短信消费者收到消息：" + messageEntity);
        //模拟调用短信接口
        double result = Math.random() * 10;
        if(result > 3){

            try {
                throw  new Exception("调用第三方短信接口短信接口失败");
            } catch (Exception e) {
                e.printStackTrace();
                // 丢弃该消息
               channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

            }
        }else{
            // 手动ack
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
            // 手动签收
            channel.basicAck(deliveryTag, false);
        }
        messageIdService.deleteId(id);

    }
}
